package com.atguigu.day10;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink08_SQL_GroupWindow {
    public static void main(String[] args) {
        //1.获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.获取表的执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int, " +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with("
                + "'connector' = 'filesystem',"
                + "'path' = 'input/sensor-sql.txt',"
                + "'format' = 'csv'"
                + ")");

        //TODO 使用GroupWindow
        //滚动窗口
//        tableEnv.executeSql("select" +
//                " id," +
//                " sum(vc)," +
//                " tumble_start(t, interval '3' second) as wstart," +
//                " tumble_end(t, interval '3' second) as wend" +
//                " from sensor" +
//                " group by id,tumble(t, interval '3' second)" +
//                "").print();
        //滑动窗口 窗口大小为3s 滑动步长为2s 注意！！！！ 滑动窗口中 三个参数 第一个指的是时间字段， 第二个指的是滑动步长 第三个指的是窗口大小
//        tableEnv.executeSql("select" +
//                " id," +
//                " sum(vc)," +
//                " hop_start(t, interval '2' second,interval '3' second) as wstart," +
//                " hop_end(t, interval '2' second,interval '3' second) as wend" +
//                " from sensor" +
//                " group by id,hop(t, interval '2' second,interval '3' second)" +
//                "").print();
        //会话窗口 会话间隔为2s
        tableEnv.executeSql("select" +
                " id," +
                " sum(vc)," +
                " session_start(t, interval '2' second) as wstart," +
                " session_end(t, interval '2' second) as wend" +
                " from sensor" +
                " group by id,session(t, interval '2' second)" +
                "").print();

    }
}
